[Kafka]04 Kafka入门 四

Kafka 原生的 API、Spring Boot 使用 Kafka

Posted by 李玉坤 on 2017-11-24

使用 Kafka 原生的 API

①消费者自动提交

定义自己的生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;


public class MyKafkaProducer {
private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer;

public MyKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置批量发送
properties.put("batch.size", 16384);
// 批量发送的等待时间50ms, 超过50ms, 不足批量大小也发送
properties.put("linger.ms", 50);
this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties);
}

public boolean sendMsg() {
boolean result = true;
try {
// 正常发送, test2是topic, 0代表的是分区, 1代表的是key, hello world是发送的消息内容
final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world");
producer.send(record);
// 有回调函数的调用
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.offset());
}
});
// 自己定义一个类
producer.send(record, new MyCallback(record));
} catch (Exception e) {
result = false;
}
return result;
}
}

定义生产者发送成功的回调函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;


public class MyCallback implements Callback {
private Object msg;

public MyCallback(Object msg) {
this.msg = msg;
}

@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
System.out.println("topic = " + metadata.topic());
System.out.println("partiton = " + metadata.partition());
System.out.println("offset = " + metadata.offset());
System.out.println(msg);
}
}

生产者测试类:在生产者测试类中,自己遇到一个坑,就是最后自己没有加 sleep,就是怎么检查自己的代码都没有问题,但是最后就是没法发送成功消息,最后加了一个 sleep 就可以了。

因为主函数 main 已经执行完退出,但是消息并没有发送完成,需要进行等待一下。当然,你在生产环境中可能不会遇到这样问题,呵呵!

代码如下:

1
2
3
4
5
6
7
8
9
10
11
import static java.lang.Thread.sleep;


public class MyKafkaProducerTest {
public static void main(String[] args) throws InterruptedException {
MyKafkaProducer producer = new MyKafkaProducer();
boolean result = producer.sendMsg();
System.out.println("send msg " + result);
sleep(1000);
}
}

消费者类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;


public class MyKafkaConsumer extends ShutdownableThread {

private KafkaConsumer<Integer, String> consumer;

public MyKafkaConsumer() {
super("KafkaConsumerTest", false);
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
properties.put("group.id", "mygroup");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("heartbeat.interval.ms", "10000");
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(properties);
}

@Override
public void doWork() {
consumer.subscribe(Arrays.asList("test2"));
ConsumerRecords<Integer, String>records = consumer.poll(1000);
for (ConsumerRecord record : records) {
System.out.println("topic = " + record.topic());
System.out.println("partition = " + record.partition());
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());
}
}
}

消费者的测试类:

1
2
3
4
5
6
7
8

public class MyConsumerTest {
public static void main(String[] args) {
MyKafkaConsumer consumer = new MyKafkaConsumer();
consumer.start();
System.out.println("==================");
}
}

②消费者同步手动提交

前面的消费者都是以自动提交 Offset 的方式对 Broker 中的消息进行消费的,但自动提交 可能会出现消息重复消费的情况。

所以在生产环境下,很多时候需要对 Offset 进行手动提交, 以解决重复消费的问题。

手动提交又可以划分为同步提交、异步提交,同异步联合提交。这些提交方式仅仅是 doWork() 方法不相同,其构造器是相同的。

所以下面首先在前面消费者类的基础上进行构造器的修改,然后再分别实现三种不同的提交方式。

同步提交方式是,消费者向 Broker 提交 Offset 后等待 Broker 成功响应。若没有收到响应,则会重新提交,直到获取到响应。

而在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。

修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;


public class MyKafkaConsumer extends ShutdownableThread {

private KafkaConsumer<Integer, String> consumer;

public MyKafkaConsumer() {
super("KafkaConsumerTest", false);
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
properties.put("group.id", "mygroup");
// 这里要修改成手动提交
properties.put("enable.auto.commit", "false");
// properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("heartbeat.interval.ms", "10000");
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(properties);
}
@Override
public void doWork() {
consumer.subscribe(Arrays.asList("test2"));
ConsumerRecords<Integer, String>records = consumer.poll(1000);
for (ConsumerRecord record : records) {
System.out.println("topic = " + record.topic());
System.out.println("partition = " + record.partition());
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());

//手动同步提交
consumer.commitSync();
}

}
}

③消费者异步手工提交

手动同步提交方式需要等待 Broker 的成功响应,效率太低,影响消费者的吞吐量。

异步提交方式是,消费者向 Broker 提交 Offset 后不用等待成功响应,所以其增加了消费者的吞吐量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;


public class MyKafkaConsumer extends ShutdownableThread {

private KafkaConsumer<Integer, String> consumer;

public MyKafkaConsumer() {
super("KafkaConsumerTest", false);
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
properties.put("group.id", "mygroup");
// 这里要修改成手动提交
properties.put("enable.auto.commit", "false");
// properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("heartbeat.interval.ms", "10000");
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(properties);
}

@Override
public void doWork() {
consumer.subscribe(Arrays.asList("test2"));
ConsumerRecords<Integer, String>records = consumer.poll(1000);
for (ConsumerRecord record : records) {
System.out.println("topic = " + record.topic());
System.out.println("partition = " + record.partition());
System.out.println("key = " + record.key());
System.out.println("value = " + record.value());

//手动同步提交
// consumer.commitSync();
//手动异步提交
// consumer.commitAsync();
// 带回调公共的手动异步提交
consumer.commitAsync((offsets, e) -> {
if(e != null) {
System.out.println("提交次数, offsets = " + offsets);
System.out.println("exception = " + e);
}
});
}
}
}

Spring Boot 使用 Kafka

现在大家的开发过程中,很多都用的是 Spring Boot 的项目,直接启动了,如果还是用原生的 API,就是有点 Low 了啊,那 Kafka 是如何和 Spring Boot 进行联合的呢?

maven 配置:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>

添加配置文件,在 application.properties 中加入如下配置信息:

Kafka 连接地址:

1
spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094

生产者:

1
2
3
4
5
6
7
spring.kafka.producer.acks = 0
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.retries = 3
spring.kafka.producer.batch-size = 4096
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.compression-type = gzip

消费者:

1
2
3
4
5
6
7
8
9
10
11
spring.kafka.consumer.group-id = mygroup
spring.kafka.consumer.auto-commit-interval = 5000
spring.kafka.consumer.heartbeat-interval = 3000
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.enable-auto-commit = true
# listenner, 标识消费者监听的个数
spring.kafka.listener.concurrency = 8
# topic的名字
kafka.topic1 = topic1

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;

@Service
@Slf4j
public class MyKafkaProducerServiceImpl implements MyKafkaProducerService {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
// 读取配置文件
@Value("${kafka.topic1}")
private String topic;

@Override
public void sendKafka() {
kafkaTemplate.send(topic, "hell world");
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
@Component
@Slf4j
public class MyKafkaConsumer {
@KafkaListener(topics = "${kafka.topic1}")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
log.info("----------------- record =" + record);
log.info("------------------ message =" + kafkaMessage.get());
}